package com.jackson.framerocketmq.producer;

/*
 * Author: JacksonCoder
 * FileName: TransactionProducer
 * Version: 1.0.0
 * Date: 2025/4/18
 */

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        TransactionMQProducer producer = new TransactionMQProducer();
        producer.setNamesrvAddr("localhost:9876");
        producer.setProducerGroup("test1");
        producer.setTransactionListener(new TransactionListener() {

            // 在该方法中执行本地事务
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                String tags = message.getTags();
                if (tags.equals("TAGA")) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if (tags.equals("TAGB")) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 该方法是mq进行消息事务的会查  如果生成这迟迟没有提交事务或者回滚事务
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("消息进入回查");
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        producer.start();
        String[] tags = new String[]{"TAGA", "TAGB", "TAGC"};
        for (int i = 0; i < 3; i++) {
            Message message = new Message("test1", tags[i], "hello world".getBytes());
            TransactionSendResult result = producer.sendMessageInTransaction(message, null);

            System.out.println("发送获取结果:" + result);
        }
//        producer.shutdown();


    }
}
